07 事件流
一、什么是事件流
以前在LangChain中,使用 stream() 时,不同类型的数据消息、状态更新、自定义事件会混在一起返回,开发者需要自己判断数据类型并解析。在LangChain v1.0中,推荐使用 stream_events(version="v3")。它返回一个 Run 对象,里面已经按类型拆分成了多个流:这样前端或业务代码可以直接消费自己关心的数据,不用再从整个Strem事件流中手动筛选数据。
在LangChain中,使用stream_events方法来获取事件流示例如下:
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""查询城市天气"""
return f"{city}今天晴天,气温28°C~35°C"
agent = create_agent(model="deepseek-v4-flash", tools=[get_weather])
# 使用stream_events获取事件流,version="v3"是固定写法
stream = agent.stream_events(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
version="v3"
)
# 流式输出模型回复
for message in stream.messages:
for delta in message.text:
print(delta, end="", flush=True)
# 获取最终状态
final_state = stream.output二、流对象的类型
stream_events返回的流对象提供了多种"投影",每种投影对应不同类型的内容:
| 投影 | 用途 |
|---|---|
stream.messages | 模型消息流,每次LLM调用产生一个 |
message.text | 文本增量和最终文本 |
message.reasoning | 推理内容(部分模型支持,比如DeepSeek的深度思考) |
message.tool_calls | 工具调用的参数块和最终调用结果 |
stream.tool_calls | 工具执行的完整生命周期,包括输入、输出、错误等 |
stream.values | Agent状态快照 |
stream.output | 最终Agent状态 |
stream.subgraphs | 嵌套图运行(子Agent场景) |
stream.extensions | 自定义投影 |
简单来说,stream.messages用于获取消息列表,stream.tool_calls获取工具调用的过程,stream.values获取Agent的状态数据。
三、流式输出模型消息
最常见的场景就是流式输出模型的回复,让用户看到"打字机"效果:
stream = agent.stream_events(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
version="v3"
)
for message in stream.messages:
# 输出当前是哪个节点产生的消息
print(f"[{message.node}] ", end="")
# 流式输出文本增量
for delta in message.text:
print(delta, end="", flush=True)
# 获取完整消息和token用量
full_message = message.output
usage = full_message.usage_metadata
if usage:
print(f"\ntoken用量: {usage}")message.text是逐字输出的增量,如果你想一次性拿到完整文本,可以用str(message.text)。
四、流式输出推理过程
有些模型支持"深度思考",比如DeepSeek的思考模式。这时候模型会先输出一段推理过程,再输出最终答案。通过message.reasoning可以拿到推理部分:
stream = agent.stream_events(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
version="v3"
)
for message in stream.messages:
# 输出推理过程
for delta in message.reasoning:
print(f"[思考] {delta}", end="", flush=True)
# 输出最终回答
for delta in message.text:
print(delta, end="", flush=True)输出效果类似:
[思考] 用户问的是杭州天气,我需要调用get_weather工具...
杭州今天晴天,气温28°C~35°C,适合出门。五、流式输出工具调用
工具调用的流式输出分为两个阶段:
阶段一:模型生成工具调用参数
当LLM决定要调用工具时,它会生成工具名和参数。这个过程可以通过message.tool_calls来流式获取:
stream = agent.stream_events(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
version="v3"
)
for message in stream.messages:
# 流式获取工具调用参数的生成过程
for chunk in message.tool_calls:
print(f"工具调用参数块: {chunk}")
# 获取最终确定的工具调用
finalized = message.tool_calls.get()
if finalized:
print(f"最终工具调用: {finalized}")阶段二:工具执行过程
工具开始执行后,可以通过stream.tool_calls获取执行的完整生命周期:
stream = agent.stream_events(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
version="v3"
)
for call in stream.tool_calls:
print(f"调用工具: {call.tool_name}")
print(f"输入参数: {call.input}")
# 流式输出工具执行结果
for delta in call.output_deltas:
print(delta, end="", flush=True)
# 获取最终输出和错误信息
print(f"输出: {call.output}")
print(f"错误: {call.error}")六、流式输出状态
有时候你需要实时监控Agent的状态变化,比如短期记忆中的消息列表变化:
stream = agent.stream_events(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
version="v3"
)
# 每次状态变化都会产生一个快照
for snapshot in stream.values:
print(snapshot)
# 获取最终状态
final_state = stream.output
print(final_state["messages"][-1].content)七、子Agent的流式输出
在多Agent协作的场景中,一个Agent可能会调用另一个Agent。这时候子Agent的事件流会通过stream.subgraphs暴露出来:
from langchain.agents import create_agent
# 定义天气Agent
def get_weather(city: str) -> str:
"""查询城市天气"""
return f"{city}今天晴天,气温28°C~35°C"
weather_agent = create_agent(
model="deepseek-v4-flash",
tools=[get_weather],
name="weather_agent",
)
# 定义一个工具,内部调用天气Agent
def call_weather(query: str) -> str:
"""调用天气Agent查询天气"""
result = weather_agent.invoke({"messages": [{"role": "user", "content": query}]})
return result["messages"][-1].text
# 定义主Agent
supervisor = create_agent(
model="deepseek-v4-flash",
tools=[call_weather],
name="supervisor",
)
stream = supervisor.stream_events(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
version="v3",
)
# 遍历子Agent的流
for subagent in stream.subgraphs:
if subagent.graph_name != "weather_agent":
continue
print(f"[{subagent.graph_name}] ", end="")
for message in subagent.messages:
for token in message.text:
print(token, end="", flush=True)
print()通过subagent.graph_name可以区分不同的子Agent,只处理你关心的那一个。
八、同时消费多种流
在实际应用中,你可能同时需要模型消息和工具调用的流。在异步代码中可以用asyncio.gather并发消费:
import asyncio
stream = await agent.astream_events(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
version="v3"
)
async def consume_messages():
async for message in stream.messages:
print(await message.text)
async def consume_tool_calls():
async for call in stream.tool_calls:
print(call.tool_name, call.input)
await asyncio.gather(consume_messages(), consume_tool_calls())在同步代码中,可以用stream.interleave交替消费:
stream = agent.stream_events(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
version="v3"
)
for name, item in stream.interleave("messages", "tool_calls", "values"):
if name == "messages":
print(item.text)
elif name == "tool_calls":
print(item.tool_name, item.input)
elif name == "values":
print(item)九、自定义流式投影
如果内置的投影不能满足需求,比如你想输出检索进度、自定义事件等,可以通过自定义transformer来扩展:
stream = agent.stream_events(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
version="v3",
transformers=[ToolActivityTransformer],
)
for activity in stream.extensions["tool_activity"]:
print(activity)也可以在中间件中注册transformer:
from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware
class ToolActivityMiddleware(AgentMiddleware):
transformers = (ToolActivityTransformer,)
agent = create_agent(
model="deepseek-v4-flash",
tools=[get_weather],
middleware=[ToolActivityMiddleware()],
)十、总结
事件流是LangChain中实现流式输出的核心机制,通过stream_events(version="v3")获取事件流后,可以根据需要选择不同的投影来消费:
- 想要模型回复的"打字机"效果?用
stream.messages配合message.text - 想看工具调用过程?用
message.tool_calls和stream.tool_calls - 想监控Agent状态?用
stream.values - 多Agent场景?用
stream.subgraphs - 需要自定义输出?用
stream.extensions
在下一篇文章中,我们将学习如何在LangChain中实现更细粒度的流式输出控制。